package org.example.table;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.*;
import org.example.domin.Result;
import org.example.domin.TobjLdHour;
import org.example.utils.JDBCUtils;
import scala.Tuple2;
import scala.Tuple4;

import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;

import static org.example.concost.Concost.*;
import static org.example.utils.TimeConversion.convertUToTime;

/**
 * @author lwc
 * @description: TODO
 * @date 2023/12/21 20:41
 */
@Slf4j
public class MPCurveDws {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("e_mp_pf_curveTable");
        SparkSession spark = null;
        if (args.length >= 1) {
            spark = SparkSession.builder()
                    .appName("HBase to MySQL Sync")
                    .config(sparkConf)
//                    .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport()
                    .getOrCreate();
            log.info("加上了local[*]");
        } else {
            spark = SparkSession.builder()
                    .appName("HBase to MySQL Sync")
                    .config(sparkConf)
                    .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport()
                    .getOrCreate();
        }
        Data(spark, sparkConf);
        spark.stop();
    }


    public static void Data(SparkSession spark, SparkConf sparkConf) {
        log.info("=================开始查询E_MP_U_CURVE数据===================");
        String startDate = DateUtil.beginOfDay(DateUtil.date()).toString("yyyyMMdd");
        String endDate = DateUtil.offsetDay(DateUtil.parse(DateUtil.beginOfDay(DateUtil.date()).toDateStr()), -1).toString("yyyyMMdd");

        String table = sparkConf.get("spark.app.table", "e_mp_pf_curve");
        String startTime = sparkConf.get("spark.app.startTime", endDate);
        String endTime = sparkConf.get("spark.app.endTime", startDate);
        String dwsMeter = sparkConf.get("spark.app.dwsMeter", "t_bus_meterpointmng");
        String dwsTable = sparkConf.get("spark.app.dwsTable", "t_obj_202312ld_hour");
        String url = sparkConf.get("spark.app.url", "jdbc:mysql://localhost:3306/test");
        String username = sparkConf.get("spark.app.username", "root");
        String password = sparkConf.get("spark.app.password", "9XME3z94xs9nhCj");

        log.info("mysql 连接的参数连接：{},表名:{},用户:{}，密码：{}", url, dwsMeter, username, password);
        log.info("hive 参数，表名：{},开始时间：{},结束时间：{}", table, startDate, endDate);

        Dataset<Row> databases = spark.sql("show databases");
        databases.show();
        try {
            //查询mysql meter
            Dataset<Row> mysqlData = spark.read()
                    .format("jdbc")
                    .option("url", url)
                    .option("dbtable", "(select dev_id from " + dwsMeter + ") as temp")
                    .option("user", username)
                    .option("password", password)
                    .load();
            mysqlData.show();

            //查询hive
            StringBuilder hiveSql = new StringBuilder();
            hiveSql.append("select * from ods_amr20_hbase.").append(table).append(" where ds >= '")
                    .append(startTime).append("'").append(" and ds <='").append(endTime).append("'");
            log.info("执行的hiveSql为：{}", hiveSql);
            Dataset<Row> hiveData = spark.sql(hiveSql.toString());
            //两个表进行join  操作
            Dataset<Row> rowDataset = mysqlData.join(hiveData, mysqlData.col("dev_id").equalTo(hiveData.col("dev_id")));

            JavaRDD<TobjLdHour> tobjLdHourVaJavaRDD = rowDataset.javaRDD().flatMap((FlatMapFunction<Row, TobjLdHour>)
                    row -> convertRowToDevTimeData(row, table).iterator());
            //将数据进行合并
            JavaPairRDD<Tuple4<Long, String, String, String>, Tuple4<Double, Double, Double, Double>> pairedRDD = tobjLdHourVaJavaRDD.mapToPair(data -> {
                // 创建键值对 (key, value)，其中 key 为 (dev_id, time)，value 为 Tuple3<data1, data2, data3>
                Tuple4<Long, String, String, String> key = new Tuple4<>(data.getF_measurement_points(),
                        data.getF_data_collection_time(), data.getF_key_name(), data.getF_data_input_time());
                Tuple4<Double, Double, Double, Double> value = new Tuple4<>(data.getData_vs(), data.getData_va(), data.getData_vb(), data.getData_vc());
                return new Tuple2<>(key, value);
            });

            // 对相同的键进行合并处理
            JavaPairRDD<Tuple4<Long, String, String, String>, Tuple4<Double, Double, Double, Double>> mergedRDD = pairedRDD.reduceByKey(new
                                                                                                                                                Function2<Tuple4<Double, Double, Double, Double>, Tuple4<Double, Double, Double, Double>, Tuple4<Double, Double, Double, Double>>() {
                                                                                                                                                    @Override
                                                                                                                                                    public Tuple4<Double, Double, Double, Double> call(Tuple4<Double, Double, Double, Double> v1, Tuple4<Double, Double, Double, Double> v2) throws Exception {
                                                                                                                                                        // 对相同键的两个值进行合并，这里假设数据类型是 Integer 类型，根据实际情况调整
                                                                                                                                                        Double mergedData1 = (v1._1() != null) ? v1._1() : v2._1();
                                                                                                                                                        Double mergedData2 = (v1._2() != null) ? v1._2() : v2._2();
                                                                                                                                                        Double mergedData3 = (v1._3() != null) ? v1._3() : v2._3();
                                                                                                                                                        Double mergedData4 = (v1._4() != null) ? v1._4() : v2._4();
                                                                                                                                                        return new Tuple4<>(mergedData1, mergedData2, mergedData3, mergedData4);
                                                                                                                                                    }
                                                                                                                                                });
            // 最终得到合并后的 JavaRDD
            JavaRDD<TobjLdHour> resultRDD = mergedRDD.map(data -> {
                TobjLdHour resultData = new TobjLdHour();
                resultData.setF_measurement_points(data._1()._1());
                resultData.setF_data_collection_time(data._1()._2());
                resultData.setF_key_name(data._1._3());
                resultData.setF_data_input_time(data._1._4());
                resultData.setF_delete(0L);
                resultData.setData_vs(data._2()._1());
                resultData.setData_va(data._2()._2());
                resultData.setData_vb(data._2()._3());
                resultData.setData_vc(data._2()._4());
                resultData.setF_data_input_time(DateUtil.date().toDateStr());
//                resultData.setF_key_name(data._1.get);
                return resultData;
            });

            Dataset<TobjLdHour> dataset = spark.createDataset(resultRDD.rdd(), Encoders.bean(TobjLdHour.class));

            dataset.show();
            log.info("保存前进行删除操作，");
            Connection connection = JDBCUtils.getConnection(url, username, password);
            StringBuilder deleteSql = new StringBuilder("delete from " + dwsTable + " where f_data_collection_time <='" + startDate + "' and f_data_collection_time >='" + endDate + "'");
            deleteSql.append(" and f_key_name in ('10128E89')");
            log.info("删除的语句为：{}", deleteSql);
            boolean execute = connection.createStatement().execute(deleteSql.toString());
            log.info("删除结果为：{}", execute);
            dataset.write()
                    .format("jdbc")
                    .option("url", url)
                    .option("dbtable", dwsTable)
                    .option("user", username)
                    .option("password", password)
                    .mode(SaveMode.Append)
                    .save();


        } catch (Exception e) {
            e.printStackTrace();
            log.error("出现错误：{}", e.getMessage());
        }

        log.info("=================开始查询E_MP_U_CURVE 结束===================");
    }


    /**
     * 组装数据
     *
     * @param row
     * @param dwsTable
     * @return
     */
    public static List<TobjLdHour> convertRowToDevTimeData(Row row, String dwsTable) {
        List<TobjLdHour> devTimeDataList = new ArrayList<>();
        Result result = tableGetResult(row, dwsTable);

        String string = getRowObject(row, "dev_id").toString();

        Object dataDate = getRowObject(row, "ds");
        Object phaseFlag = getRowObject(row, "phase_flag");
        // 从 u0000 到 u2359 的字段
        for (int i = 0; i <= 2359; i++) {
            String columnName = result.getType() + String.format("%04d", i);
            Object dataDouble = getRowObject(row, columnName);

//            devTimeData.addData(columnName, value);
            if (ObjectUtil.isNotNull(dataDouble)) {
                if (NumberUtil.isNumber(dataDouble.toString())) {
                    log.info("columnName为：{},dataDouble:{}", columnName, dataDouble);
                    TobjLdHour devTimeData = new TobjLdHour();
                    devTimeData.setF_measurement_points(Long.valueOf(string));
                    devTimeData.setF_data_input_time(DateUtil.date().toDateStr());
                    tobjHourVaSetAvlite(result, devTimeData, dataDouble);
                    String parseDate = DateUtil.parse(dataDate.toString(), "yyyyMMdd").toDateStr() + " " + convertUToTime(columnName);
                    devTimeData.setF_data_collection_time(parseDate);
                    devTimeData.setF_delete(0L);
                    devTimeData.setF_key_name(result.getKeyName());
                    devTimeDataList.add(devTimeData);
                }
            }

        }

        return devTimeDataList;
    }


    /**
     * 将不同数据放入不同实体类中的字段上
     *
     * @param result
     * @param tobjLdHour
     * @param dataDouble
     */
    private static void tobjHourVaSetAvlite(Result result, TobjLdHour tobjLdHour, Object dataDouble) {
        switch (result.getPhaseFlag()) {
            case S10128E89:
                tobjLdHour.setData_vs(Double.parseDouble(dataDouble.toString()));
                break;
            case A10128E89:
                tobjLdHour.setData_va(Double.parseDouble(dataDouble.toString()));
                break;
            case B10128E89:
                tobjLdHour.setData_vb(Double.parseDouble(dataDouble.toString()));
                break;
            case C10128E89:
                tobjLdHour.setData_vc(Double.parseDouble(dataDouble.toString()));
                break;
        }
    }


    private static Result tableGetResult(Row data, String table) {
        // 在这里需要重新获取 表判断的字段
        String phaseFlag = (String) getRowObject(data, "phase_flag");
        String dataType = (String) getRowObject(data, "data_type");
        if (table.equals(EMPUCURVE)) {
            return new Result(phaseFlag, E215, "u", null);
        } else if (table.equals(EMPPFCURVE)) {
            return new Result(phaseFlag, E89, "c", null);
        } else if (table.equals(EMPICURVE)) {
//            joinPirxd(sqlDataBuild, phaseFlag);
            return new Result(phaseFlag, E25, "i", null);
        } else if (table.equals(EMPPCURVE)) {
            if (dataType != null) {
                if ((dataType.equals(S10128E49) || dataType.equals(A10128E49)) || dataType.equals(B10128E49) || dataType.equals(C10128E49)) {
                    return new Result(dataType, E49, "p", null);
                } else if ((dataType.equals(S10128E59) || dataType.equals(A10128E59)) || dataType.equals(B10128E59) || dataType.equals(C10128E59)) {
                    return new Result(dataType, E59, "p", null);
                } else if ((dataType.equals(S10128E39) || dataType.equals(A10128E39)) || dataType.equals(B10128E39) || dataType.equals(C10128E39)) {
                    return new Result(dataType, E39, "p", null);
                }
            }
        } else {
            return null;
        }
        return null;
    }


    /**
     * 取出数据
     *
     * @param data
     * @param point
     * @return
     */
    private static Object getRowObject(Row data, String point) {
        if (!data.schema().getFieldIndex(point).toList().isEmpty() &&
                ObjectUtil.isNotNull(data.getAs(point))) {
            return data.getAs(point);
        } else {
            return null;
        }
    }

}
